[SPARK-44856][PYTHON] Improve Python UDTF arrow serializer performance#50099
[SPARK-44856][PYTHON] Improve Python UDTF arrow serializer performance#50099HyukjinKwon wants to merge 2 commits intoapache:masterfrom
Conversation
9e07f1b to
780bdb3
Compare
780bdb3 to
9c8064c
Compare
ba186c8 to
cb066a6
Compare
9519216 to
a52e348
Compare
There was a problem hiding this comment.
Why do we need to check both error classes?
There was a problem hiding this comment.
The problem is that, previously, we were able to create a pandas DataFrame with different schema but Arrow does not seem allowing it.
More specifically previous code path:
yield verify_result(
pd.DataFrame(check_return_value(res))
), arrow_return_type
did not throw an error at pd.DataFrame(...).
However, new code path
convert_to_arrow(func())
ret = LocalDataToArrowConversion.convert(
data, return_type, prefers_large_var_types
).to_batches()
this throw an error at LocalDataToArrowConversion.convert(...).
We could further improve this but I would prefer to get out of this scope in this PR considering that there are already behaviour differences with/without Arrow.
|
Let me add a legacy conf ... to be safer .. |
557574e to
3da35ec
Compare
94ab19b to
6eda44b
Compare
|
Could you fix the remaining failures? |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM. Thank you again for making it work, @HyukjinKwon .
python/pyspark/worker.py
Outdated
There was a problem hiding this comment.
Can we move this to before the above line?
There was a problem hiding this comment.
Actually no .. otherwise, the tests fail. There is a test that throw an exception when the batch is empty. LocalDataToArrowConversion.convert checks if data is empty, and the exception will be wrapped by PySparkRuntimeError.
python/pyspark/worker.py
Outdated
There was a problem hiding this comment.
Can this return multiple batches? What happens in that case?
There was a problem hiding this comment.
Yeah, if it grows over the default size (https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.to_batches) it can be multiple batches. It should work though - I wrote the codes that it should work via ArrowStreamUDFSerializer.dump_stream.
python/pyspark/worker.py
Outdated
There was a problem hiding this comment.
IIRC, it must return exactly one batch per input row. convert_to_arrow should always return one batch. cc @allisonwang-db
There was a problem hiding this comment.
From what I read about the codes, it seems fine... but would be great if we can confirm this.
There was a problem hiding this comment.
For directly UDTF usage like MyUDTF(lit(1), lit(2)) it's fine to return multiple batches, but for lateral joins like SELECT * FROM t, LATERAL MyUDTF(a, b)), we must match each input row with all output of the UDTF for that row. If we return multiple batches, then we can't distinguish which batch to join with which input row.
There was a problem hiding this comment.
Just checked. https://github.com/apache/arrow/blob/d2ddee62329eb711572b4d71d6380673d7f7edd1/cpp/src/arrow/table.cc#L612-L638
The batch size will be long max by default, which I believe it's pretty safe. Arrow batch cannot contain # of rows larger than long in any way.
2632c5d to
d417e42
Compare
d417e42 to
64ca2b1
Compare
Co-authored-by: Allison Wang <allison.wang@databricks.com>
|
Merged to master. |
…rk Connect compatibility test ### What changes were proposed in this pull request? This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now. ### Why are the changes needed? After #50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629 In fact, UDTF with Arrow is still under development so we can skip the tests for now ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Will monitor the build. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50856 from HyukjinKwon/SPARK-44856-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…rk Connect compatibility test This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now. After #50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629 In fact, UDTF with Arrow is still under development so we can skip the tests for now No, test-only. Will monitor the build. No. Closes #50856 from HyukjinKwon/SPARK-44856-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit dfc8175) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…rk Connect compatibility test ### What changes were proposed in this pull request? This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now. ### Why are the changes needed? After #50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629 In fact, UDTF with Arrow is still under development so we can skip the tests for now ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Will monitor the build. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50856 from HyukjinKwon/SPARK-44856-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit dfc8175) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR removes pandas <> Arrow <> pandas conversion in Arrow-optimized Python UDTF by directly using PyArrow. ### Why are the changes needed? Currently, there is a lot of overhead in the arrow serializer for Python UDTFs. The overhead is largely from converting arrow batches into pandas series and converting UDTF's results back to a pandas dataframe. We should try directly converting Python object into arrow and vice versa to avoid the expensive pandas conversion. ### Does this PR introduce _any_ user-facing change? Yes. Previously the conversion was ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50099 from HyukjinKwon/SPARK-44856. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…rk Connect compatibility test ### What changes were proposed in this pull request? This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now. ### Why are the changes needed? After apache#50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629 In fact, UDTF with Arrow is still under development so we can skip the tests for now ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Will monitor the build. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50856 from HyukjinKwon/SPARK-44856-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…rk Connect compatibility test This PR proposes to skip ArrowUDTFParityTests in Spark Connect compatibility test for now. After apache#50099, the compatibility test fails https://github.com/apache/spark/actions/runs/14959668798/job/42019945629 In fact, UDTF with Arrow is still under development so we can skip the tests for now No, test-only. Will monitor the build. No. Closes apache#50856 from HyukjinKwon/SPARK-44856-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 9bb0409) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR removes pandas <> Arrow <> pandas conversion in Arrow-optimized Python UDTF by directly using PyArrow.
Why are the changes needed?
Currently, there is a lot of overhead in the arrow serializer for Python UDTFs. The overhead is largely from converting arrow batches into pandas series and converting UDTF's results back to a pandas dataframe.
We should try directly converting Python object into arrow and vice versa to avoid the expensive pandas conversion.
Does this PR introduce any user-facing change?
Yes. Previously the conversion was
How was this patch tested?
Existing tests.
Was this patch authored or co-authored using generative AI tooling?
No.